c22e9098934d2fdf4205aae1e0f54eedc68bac33,jdbc-lib/src/test/java/com/streamsets/pipeline/stage/origin/jdbc/table/BasicIT.java,BasicIT,testSingleTableMultipleBatches,#,245

Before Change


        .tableConfigBeans(ImmutableList.of(tableConfigBean))
        .build();

    SourceRunner runner = new SourceRunner.Builder(TableJdbcDSource.class, tableJdbcSource)
        .addOutputLane("a").build();
    runner.runInit();
    try {
      StageRunner.Output output = runner.runProduce("", 5);
      List<Record> records = output.getRecords().get("a");
      Assert.assertEquals(5, records.size());
      checkRecords(EXPECTED_CRICKET_STARS_RECORDS.subList(0, 5), records);

      output = runner.runProduce(output.getNewOffset(), 5);
      records = output.getRecords().get("a");
      Assert.assertEquals(5, records.size());
      checkRecords(EXPECTED_CRICKET_STARS_RECORDS.subList(5, 10), records);

After Change


        .tableConfigBeans(ImmutableList.of(tableConfigBean))
        .build();

    PushSourceRunner runner = new PushSourceRunner.Builder(TableJdbcDSource.class, tableJdbcSource)
        .addOutputLane("a")
        .setOnRecordError(OnRecordError.TO_ERROR)
        .build();
    runner.runInit();
    try {
      JdbcPushSourceTestCallback callback = new JdbcPushSourceTestCallback(runner, 2);
      runner.runProduce(Collections.emptyMap(), 5, callback);

      List<List<Record>> batchRecords = callback.waitForAllBatchesAndReset();

      List<Record> records = batchRecords.get(0);
      Assert.assertEquals(5, records.size());
      checkRecords(EXPECTED_CRICKET_STARS_RECORDS.subList(0, 5), records);